package com.xiguthings.xiniu.iot.trigger.worker.kafka;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.xiguthings.iot.kafka.KafkaClienAstract;
import com.xiguthings.iot.kafka.consumer.Consumer;
import com.xiguthings.iot.kafka.consumer.base.KafkaConsumerConfig;
import com.xiguthings.iot.kafka.producer.DataProducer;
import com.xiguthings.iot.kafka.producer.base.KafkaProducerConfig;
import com.xiguthings.xiniu.iot.trigger.worker.controller.DataContrller;

@Component
public class KafkaClient extends KafkaClienAstract {
	// private final Logger LOGGER = LoggerFactory.getLogger(KafkaClient.class);

	@Autowired
	private DataContrller dataContrller;

	private DataProducer dataProducer;

	@Value("${kafka.consumer.topic}")
	private String topic;
	@Value("${kafka.consumer.group-id}")
	private String groupId;
	@Value("${kafka_consumer_bootstrap_servers}")
	public String consumerServers;
	@Value("${kafka.enable.auto.commit}")
	public boolean enableAutoCommit;
	@Value("${kafka.session.timeout.ms}")
	public String sessionTimeout;
	@Value("${kafka.auto.commit.interval.ms}")
	public String autoCommitInterval;
	@Value("${kafka.auto.offset.reset}")
	public String autoOffsetReset;

	@Value("${kafka.producer.acks}")
	public String acks;
	@Value("${kafka_producer_bootstrap_servers}")
	public String producerServers;
	@Value("${kafka.producer.retries}")
	public int retries;
	@Value("${kafka.producer.buffer.memory}")
	public long bufferMemorry;
	@Value("${kafka.producer.batch.size}")
	public int bacthSize;
	@Value("${kafka.producer.linger.ms}")
	public long lingerMs;

	@PostConstruct
	private void action() {

		// 创建生产者
		KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(acks, producerServers, retries, bufferMemorry,
				bacthSize, lingerMs);
		this.dataProducer = new DataProducer(kafkaProducerConfig);

		// 开启消费线程
		Thread consumerThread = new Thread(new Runnable() {
			@Override
			public void run() {
				KafkaConsumerConfig kafkaConsumerConfig = new KafkaConsumerConfig(consumerServers, enableAutoCommit,
						sessionTimeout, autoCommitInterval, autoOffsetReset);
				// 创建消费者
				Consumer consumer = new Consumer(topic, groupId, kafkaConsumerConfig);
				consumMessage(consumer, 100);
			}
		});
		consumerThread.start();
	}

	@Override
	public void handlerMessage(Consumer consumer, String dataStr) {
		dataContrller.receiveData(dataStr);
	}

	public void produceData(String value, String topic, Integer partition) {
		super.produceData(dataProducer, value, topic, partition);
	}
}
